< Back
## Kafka (in docker container) + Spring.
This post is a refined version of the Sping document's getting start guid. It make sure it can run on a localhost successfully.
>Spring doc getting start https://docs.spring.io/spring-kafka/docs/current/reference/html/#getting-started
## Architecture
```plantuml
@startuml
rectangle host {
rectangle vm {
rectangle container as zkc{
rectangle zookeeper as zk
}
rectangle container as kfc{
rectangle kafka as kf
}
zk <.left.> kf
}
rectangle consumer as cs
rectangle producer as pd
pd --> kfc : push message
kfc --> cs : consume message
}
@enduml
```
>run kafka in docker: https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
From the architecture above. I have a vm running inside my host.
To run `kafka` and `zookeeper` container use the following `docker-compose.yml`
```yaml
version: "2"
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.7
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:3
ports:
- "9092:9092"
- "9093:9093"
volumes:
- "kafka_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://kafka-docker-machine-dev:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
```
For the `KAFKA_CFG_LISTENERS` , we have 2 listeners the `CLIENT` and `EXTERNAL`. Kafka's broker can have one or more listeners attatched to a broker. Now let's start from scratch. Without `KAFKA_CFG_LISTENERS` configured as above, we would have only 1 listener to a borker that run inside a docker container. Therefore, when we connect to vm's ip with port `9092` , Kafka broker will return the target broker metadata where producer/consumer can write/read from. In docker case, it will return kafka's container id instead.
```text
$ docker ps
CONTAINER ID IMAGE .....
ab0fa12a3422 bitnami/kafka:3 .....
```
like `ab0fa12a3422` . Hence, Spring will have no way to know if where `ab0fa12a3422:9092` is. Unless I mapped `ab0fa12a3422` to vm's ip in my host's hosts file then the host can resolve that `ab0fa12a3422` to vm's ip correctly and spring can started to connnect to the borker. Unfortunately, docker's container id is keep changing. We have to deal with this problem. `KAFKA_CFG_LISTENERS` is the answer.
When Spring initiate a connection to a broker, in the initial state, broker would return docker's id, but if we configure kafka as docker-compose file above. We can now have a choice which listener we want to talk to; the `External` one. No matter what contianer id will be, the `EXTERNAL` lsitener will always return `kafka-docker-machine-dev` as per configured. Therefore, I just have to map `kafka-docker-machine-dev` in my host's hosts file.
---
## Spring boot app.
add `Spring for Apache Kafka` when bootstrap from spring initializer.
### Consumer
The `KafkaConsumerApplication.java`
```java
package com.kone.sandbox.kafkaconsumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
@Slf4j
@SpringBootApplication
public class KafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerApplication.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1").partitions(10).replicas(1).build();
}
@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
log.info(in);
}
}
```
`application.yml`
```yaml
spring:
kafka:
consumer:
auto-offset-reset: earliest
properties:
bootstrap:
servers: "kafka-docker-machine-dev:9093"
admin:
properties:
bootstrap:
servers: "kafka-docker-machine-dev:9093"
```
### Producer
`KafkaProducerApplication.java`
```java
package com.kone.sandbox.kafkaproducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;
import java.time.LocalTime;
@Slf4j
@SpringBootApplication
public class KafkaProducerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1").partitions(10).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic1", "test:" + LocalTime.now());
};
}
}
```
`applicaiton.yml`
```java
spring:
kafka:
producer:
auto-offset-reset: earliest
properties:
bootstrap:
servers: "kafka-docker-machine-dev:9093"
admin:
properties:
bootstrap:
servers: "kafka-docker-machine-dev:9093"
```
**Result**
